使用 PySpark 处理数据笔记

调用 Spark 能力处理数据

  1. 调用 Master 几种模式:

    • local

    • local[K]

    • local[*]

    • spark://host:port

    • yarn

      sparkSession >unfolded
      1
      2
      3
      4
      5
      6
      7
      # 调用 Standalone 模式下的集群能力,同时设置了 worker 的内存使用配置
      spark = SparkSession \
      .builder \
      .appName("C3Analysis") \
      .master("spark://hn-manager:7077") \
      .config("spark.executor.memory", "13G") \
      .getOrCreate()
  2. 在读取数据时,分为本地数据读取和 HDFS 数据读取。需要本地读取数据那么需要通过 file://file_path 的方式,可以直接添加目录,读取目录中的数据

  3. hdfs 管理,使用 HDFS 管理文件目录时,需要注意使用相关的用户权限

  4. 环境变量添加与修改 在使用 PySpark 功能时,存在需要修改环境变量 PPYSPARK_PYTHONPYSPARK_DRIVER_PYTHON 为临时环境值。该问题的解决方案是修改 os package 中的 environ 实例:

    1
    2
    3
    import os
    os.environ["PYSPARK_PYTHON"]="/data/Anaconda3/bin/python"
    os.environ["PYSPARK_DRIVER_PYTHON"]="/data/Anaconda3/bin/python"
  5. PySpark 退出服务 在运行完 PySpark 程序之后,需要关闭 Spark 的话需要使用 SparkContextstop() 方法关闭或者使用 System.exit(0) 命令或者 sys.exit() 方法退出应用

  6. SparkDataFrame 增加行索引 使用 F.monotonically_increasing_id() 方法可以增加行索引,但是得到的索引值并不是显性排序的——得到的结果的索引和是否为单机、数据是否分为了多个 parlition 等原因相关。命令方式如下: data = data.select(F.monotonically_increasing_id().alias("index"), "*")

  7. 使用定制化的 Java 方法 思路上使用 Py4J 实现 Python 中数据和 JVM 通信的方式,而在 PySpark 已经支持了 JVM。因此重要的节点上实现 Java 中的类被加载在 PySpark shell中,实现的方法是:

    1. 在启动 PySpark 阶段需要添加上包含了类的 JAR 包,eg: pyspark --master local --driver-class-path <jar 包路径> 命令可以在单机模式下将添加 jar 包(在 Spark 2.4.0 版本中使用 --jars 参数方式替换 --driver-class-path 也是可行的)
    2. SparkContext 对象的 jvm 调用相关 Java 类方法,例如命令 sc._jvm.com.python.test.TestDem0001.concatStatic('1', 'a', 'b') 是表示调用 com.python.test 的 package 下 TestDem0001 类的 concatStatic 方法
  8. PySpark 中的 SparkDataFrame 字段名称是没有区分大小写的,例如 df.select("ITV_accounT").head(3)df.select("itv_account").head(3) 都可以得到结果

    example
作者

ZenRay

发布于

2020-11-30

更新于

2021-03-17

许可协议

CC BY-NC-SA 4.0